﻿#include <QCoreApplication>
#include <QVector>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <cmath>
#include <string>
#include <memory>
#include <QFile>
#include "cmdlineparser.h"
#include "tb_interface.h"
#include "resample.h"
using namespace TASKBUS;
const int OFFLINEDEBUG = 0;
//数据源方法
int do_resample(const cmdlineParser & args);
//全局的终止标记
static bool bfinished = false;
using namespace std;
int main(int argc , char * argv[])
{
	QCoreApplication a(argc, argv);
	//重要！设置输入输出为二进制！
	init_client();

	//解释命令行
	cmdlineParser args;
	if (OFFLINEDEBUG==0)
		args.parser(argc,argv);
	else
	{
		FILE * old_stdin, *old_stdout;
		auto ars = debug("/home/user/codes/build-taskbus-linux-Release/bin/debug/pid7852/",&old_stdin,&old_stdout);
		args.parser(ars);
	}

	int ret = 0;
	//每个模块要响应 --information参数,打印自己的功能定义字符串。或者提供一个json文件。
	if (args.contains("information"))
	{
		QFile fp(":/resample_pqfraction.json");
		if (fp.open(QIODevice::ReadOnly)==false)
		{
			fp.setFileName(":/json/filter_fir.json");
			fp.open(QIODevice::ReadOnly);
		}
		if (fp.isOpen())
		{
			QByteArray arr = fp.readAll();
			arr.push_back('\0');
			puts(arr.constData());
			fflush(stdout);
		}
		ret = 0;
	}
	else if (args.contains("function"/*,"filter_fir"*/))//正常运行模式
	{
		ret = do_resample(args);
	}
	else
	{
		fprintf(stderr,"Error:Function does not exits.");
		ret = -1;
	}

	return ret;
}

std::vector<float> input_f[2],output_f[2];
std::vector<short> out_cache;
int kUpfactor = 1, kDownfactor = 2;

unsigned int instance	  = 0;
unsigned int iinput	  = 0;
unsigned int i_tmin	  = 0;
unsigned int i_tmout	  = 0;
int iout = 0;


unsigned long long input_clk = 0, output_clk = 0;
double doutin_clock = 0, doutin_ratio = 1;
std::map<unsigned long long,std::vector<unsigned char> > timestamp_vec;
unsigned long long next_timestamps = 0;

bool tmstamp_exists = false;
bool tmstamp_begin =false;
void calc_resample()
{
	if (input_f[0].size())
		resample<float> ( kUpfactor, kDownfactor, input_f[0], output_f[0] );
	if (input_f[1].size())
		resample<float> ( kUpfactor, kDownfactor, input_f[1], output_f[1] );

	//缓存
	const int sizei = output_f[0].size();
	const int sizeq = output_f[1].size();
	for (int i =0;i<sizei;++i)
	{
		out_cache.push_back(output_f[0][i]);
		if (sizeq)
			out_cache.push_back(output_f[1][i]);
	}
	output_f[0].clear();
	output_f[1].clear();

}

void broadresult()
{
	//缓存
	bool q_existed = input_f[1].size()?true:false;
	const int spsize = q_existed?4:2;
	if (tmstamp_exists)
	{
		int sizei = out_cache.size()*2/spsize;
		int sizeq = out_cache.size()*2/spsize;
		while(doutin_clock + sizei * doutin_ratio >  next_timestamps)
		{
			//保持时续的一一对应
			long long commit_clk = output_clk;
			double commit_dclk = doutin_clock;
			for (int i =0;i<sizei;++i)
			{
				if (commit_dclk >= next_timestamps)
				{
					if (iout )
					{
						push_subject(iout,instance, i * spsize,
									 (unsigned char *)out_cache.data()
									 );
						out_cache.erase(out_cache.begin(),out_cache.begin()+i*spsize/2);
					}
					//播发老时戳的数据
					if (i_tmout && timestamp_vec.size())
						push_subject(i_tmout,instance,timestamp_vec.begin()->second.size(),
									 (unsigned char *)timestamp_vec.begin()->second.data()
									 );
					//新时戳
					if (timestamp_vec.size())
						timestamp_vec.erase(timestamp_vec.begin());
					next_timestamps = -1;//max
					if (timestamp_vec.size())
						next_timestamps = timestamp_vec.begin()->first;
					output_clk = commit_clk;
					doutin_clock = commit_dclk;
					break;
				}
				++commit_clk;
				commit_dclk += doutin_ratio;
			}
			sizei = out_cache.size()*2/spsize;
			sizeq = out_cache.size()*2/spsize;
		}

	}
	else
	{
		//播发
		if (iout&&out_cache.size())
		{
			push_subject(iout,instance,out_cache.size()*2,
						 (unsigned char *)out_cache.data()
						 );
		}
		out_cache.clear();
	}
}


int do_resample(const cmdlineParser & args)
{
	using namespace TASKBUS;
	int res = 0;
	//获得平台告诉自己的实例名
	instance  = args.toInt("instance",0);
	iinput	  = args.toInt("in",0);
	i_tmin	  = args.toInt("in_time",0);
	i_tmout	  = args.toInt("out_time",0);
	iout	  = args.toInt("out",0);
	const int np = args.toInt("np",2);
	const int nq = args.toInt("nq",1);
	//工作模式
	const int sptype	=	args.toInt("sptype",0);					fprintf(stderr,"sptype is %d.",sptype);
	int type	=	args.toInt("type",1);
	if (type<0 || type >1)
		type = 0;

	fflush(stderr);

	//计算翻转倍数
	kUpfactor = np;
	kDownfactor = nq;

	fprintf(stderr,"NP=%d,NQ=%d\n",np,nq);

	doutin_ratio = 1.0*kDownfactor/kUpfactor;
	int nBatchSize = kDownfactor;
	while (nBatchSize<1000)
		nBatchSize *= 2;

	input_f[0].resize(nBatchSize);
	if (type==1)
		input_f[1].resize(nBatchSize);


	try{
		//判断参数合法性
		if (instance==0)
			throw "function=quit;{\"error\":\"instance is 0, quit.\"}";
		int failed_header = 0;
		while (false==bfinished)
		{
			subject_package_header header;
			vector<unsigned char> packagedta = pull_subject(&header);
			if (is_valid_header(header)==false)
			{
				if (++failed_header>16)
					bfinished = true;
				continue;
			}
			if ( is_control_subject(header))
			{
				//收到命令进程退出的广播消息,退出
				if (strstr(control_subject(header,packagedta).c_str(),"function=quit;")!=nullptr)
					bfinished = true;
			}
			else if (header.subject_id==i_tmin && i_tmout)
			{
				tmstamp_exists = true;
				if (!tmstamp_begin )
				{
					if (i_tmout)
					push_subject(i_tmout,instance,packagedta.size(),
								 (unsigned char *)packagedta.data()
								 );
				}
				else
				{
					if (timestamp_vec.size()==0)
						next_timestamps = input_clk;
					timestamp_vec[input_clk] = std::move(packagedta);
					broadresult();
				}
				tmstamp_begin = true;
			}
			else if (header.subject_id == iinput)
			{
				//Input Buffer
				switch(sptype)
				{
				//Intel 16bit
				case 0:{
					if (type==0)
					{
						const short * pdata = (const short *) packagedta.data();
						const int samples = packagedta.size()/sizeof(short);
						for (int spid = 0;spid < samples; ++ spid)
						{
							input_f[0][input_clk % nBatchSize] = pdata[spid];
							++input_clk;
							if (input_clk % nBatchSize==0)
								calc_resample();
						}
					}
					else
					{
						const short (* pdata)[2] = (const short (*)[2]) packagedta.data();
						const int samples = packagedta.size()/sizeof(short)/2;
						for (int spid = 0;spid < samples; ++ spid)
						{
							input_f[0][input_clk % nBatchSize] = pdata[spid][0];
							input_f[1][input_clk % nBatchSize] = pdata[spid][1];
							++input_clk;
							if (input_clk % nBatchSize==0)
								calc_resample();
						}
					}

				}
					break;
					//16bit U
				case 1:{
					if (type==0)
					{
						const unsigned char * pdata = (const unsigned char *) packagedta.data();
						const int samples = packagedta.size()/sizeof(short);
						for (int spid = 0;spid < samples; ++ spid)
						{
							unsigned char pt[2] = {pdata[spid*2+1],pdata[spid*2]};
							short * ptv = (short *)pt;
							input_f[0][input_clk % nBatchSize] = *ptv;
							++input_clk;
							if (input_clk % nBatchSize==0)
								calc_resample();
						}
					}
					else
					{
						const unsigned char * pdata = (const unsigned char *) packagedta.data();
						const int samples = packagedta.size()/sizeof(short)/2;
						for (int spid = 0;spid < samples; ++ spid)
						{
							unsigned char pt[2][2] = {
								{pdata[spid*4+1],pdata[spid*4]},
								{pdata[2+spid*4+1],pdata[2+spid*4]}
													 };
							short * ptv[2] = {(short *)pt[0],(short *)pt[1]};
							input_f[0][input_clk % nBatchSize] = *(ptv[0]);
							input_f[1][input_clk % nBatchSize] = *(ptv[1]);
							++input_clk;
							if (input_clk % nBatchSize==0)
								calc_resample();
						}
					}

				}
					break;
					//int8
				case 2:{
					if (type==0)
					{
						const char * pdata = (const char *) packagedta.data();
						const int samples = packagedta.size()/sizeof(char);
						for (int spid = 0;spid < samples; ++ spid)
						{
							input_f[0][input_clk % nBatchSize] = pdata[spid];
							++input_clk;
							if (input_clk % nBatchSize==0)
								calc_resample();
						}
					}
					else
					{
						const char (* pdata)[2] = (const char ( *)[2]) packagedta.data();
						const int samples = packagedta.size()/sizeof(char)/2;
						for (int spid = 0;spid < samples; ++ spid)
						{
							input_f[0][input_clk % nBatchSize] = pdata[spid][0];
							input_f[1][input_clk % nBatchSize] = pdata[spid][1];
							++input_clk;
							if (input_clk % nBatchSize==0)
								calc_resample();
						}
					}

				}
					break;
				case 3:{
					if (type==0)
					{
						const unsigned char * pdata = (const unsigned char *) packagedta.data();
						const int samples = packagedta.size()/sizeof(char);
						for (int spid = 0;spid < samples; ++spid )
						{
							input_f[0][input_clk % nBatchSize] = ((short)(pdata[spid])-127)*128;
							++input_clk;
							if (input_clk % nBatchSize==0)
								calc_resample();
						}
					}
					else
					{
						const unsigned char (* pdata)[2] = (const unsigned char ( *)[2]) packagedta.data();
						const int samples = packagedta.size()/sizeof(char)/2;
						for (int spid = 0;spid < samples; ++ spid)
						{
							input_f[0][input_clk % nBatchSize] = ((short)(pdata[spid][0])-127)*128;
							input_f[1][input_clk % nBatchSize] = ((short)(pdata[spid][1])-127)*128;
							++input_clk;
							if (input_clk % nBatchSize==0)
								calc_resample();
						}
					}
				}
					break;
				default:
					break;
				}
				if (!tmstamp_exists)
					broadresult();

			}
		}


	}
	catch (const char * errMessage)
	{
		//向所有部位广播，偶要退出。
		push_subject(control_subect_id(),/*instance,broadcast_destin_id(),*/0,errMessage);
		fprintf(stderr,"Error:%s.",errMessage);
		fflush (stderr);
		res = -1;
	}

	return res;
}

